/*
 *  Filename:lsv_volume.c
 *  Description:
 *
 *  Created on: 2017年3月2日
 *  Author: Asdf(825674301)
 */

#include "config.h"

#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "net_global.h"
#include "md_map.h"
#include "volume_proto.h"
#include "controller_internal.h"

#include "lsv.h"

#include "lsv_conf.h"
#include "lsv_volume.h"
#include "lsv_volume_storage.h"
#include "lsv_volume_gc.h"
#include "lsv_help.h"
#include "lsv_chunk_pool.h"

#define LSV_VOLUME_GC 0

#ifdef LSV

extern lsv_volume_storage_operations lich_op;
lsv_volume_storage_operations *volume_storage_op = &lich_op;

#else

extern lsv_volume_storage_operations file_op;
lsv_volume_storage_operations *volume_storage_op = &file_op;

#endif

extern uint32_t lsv_feature;

static inline int _obj_offset_size_check(lsv_u32_t *offset, lsv_u32_t *size) {
        int ret = 0;
        if (*offset >= LSV_CHUNK_SIZE) {
                ret = EINVAL;
                DERROR("offset greater than LSV_CHUNK_SIZE,errno:%d\n", ret);
                YASSERT(0);
                goto ERR;
        }

        if (*offset + *size > LSV_CHUNK_SIZE) {
                *size = LSV_CHUNK_SIZE - *offset;
        }
ERR:
        return ret;
}

int lsv_volume_init(lsv_volume_proto_t *lsv_info, lsv_u32_t flag) {
        DINFO("flag %u\n", flag);

        if (LSV_SYS_CREATE == flag) {
                return lsv_volume_create(lsv_info);
        } else if (LSV_SYS_LOAD == flag) {
                // TODO core
                return lsv_volume_load(lsv_info);
        } else if (LSV_SYS_RECOVERY == flag) {
                return lsv_volume_load(lsv_info); //todo. for others rather than row3.
                return lsv_volume_recovery(lsv_info);
        } else {
                return EPERM;
        }
}

int lsv_volume_create(lsv_volume_proto_t *lsv_info)
{
        int ret = 0;

        ret = volume_storage_op->create(lsv_info);
        if (unlikely(ret)) {
                DERROR("storage create failed,errno:%d\n", ret);
                goto ERR;
        }

#if LSV_VOLUME_GC
        ret = lsv_volume_volgc_create(lsv_info);
        if (unlikely(ret)) {
                DERROR("volgc create failed,errno:%d\n", ret);
                goto ERR;
        }
#else
        ret = lsv_chunk_pool_init(lsv_info, 1);
        if (unlikely(ret)) {
                DERROR("volgc create failed,errno:%d\n", ret);
                goto ERR;
        }
#endif

        return 0;
ERR:
        return ret;
}

int lsv_volume_delete(lsv_volume_proto_t *lsv_info)
{
#if LSV_VOLUME_GC
        lsv_volume_volgc_delete(lsv_info);
#else 
        lsv_chunk_pool_destory(lsv_info);
#endif

        return volume_storage_op->delete(lsv_info);
}

int lsv_volume_flush(lsv_volume_proto_t *lsv_info)
{
        #if LSV_VOLUME_GC
        lsv_volume_volgc_flush(lsv_info);
        #endif

        return volume_storage_op->close(lsv_info);
}

int lsv_volume_load(lsv_volume_proto_t *lsv_info) {
        int ret = 0;

        ret = volume_storage_op->open(lsv_info);
        if (unlikely(ret)) {
                DERROR("open stroage err,errno:%d\n", ret);
                goto ERR;
        }

#if LSV_VOLUME_GC
        ret = lsv_volume_volgc_load(lsv_info);
        if (unlikely(ret)) {
                DERROR("upload volgc err,errno:%d\n", ret);
                goto ERR;
        }
#else
        ret = lsv_chunk_pool_init(lsv_info, 0);
        if (unlikely(ret)) {
                DERROR("upload volgc err,errno:%d\n", ret);
                goto ERR;
        }
#endif
        return 0;
ERR:
        return ret;
}

int lsv_volume_recovery(lsv_volume_proto_t *lsv_info)
{
        int ret = 0;

        ret = volume_storage_op->open(lsv_info);
        if (unlikely(ret)) {
                DERROR("open stroage err,errno:%d\n", ret);
                goto ERR;
        }
#if LSV_VOLUME_GC
        ret = lsv_volume_volgc_recovery(lsv_info);
        if (unlikely(ret)) {
                DERROR("recovery volgc err,errno:%d\n", ret);
                goto ERR;
        }
#else
        //nothing for recovery.
        ret = lsv_chunk_pool_init(lsv_info, 0);
        if (unlikely(ret)) {
                DERROR("upload volgc err,errno:%d\n", ret);
                goto ERR;
        }
#endif
        return 0;
ERR:
        return ret;
}

int lsv_volume_chunk_write(lsv_volume_proto_t *lsv_info, lsv_s8_t *buf, lsv_u8_t type, lsv_u32_t *chunk_id) {
        int ret = 0;
        lsv_u32_t this_chunk_id = LSV_CHUNK_NULL;

#if LSV_VOLUME_GC
        ret = lsv_volume_chunk_malloc(lsv_info, type, &this_chunk_id);
        if (unlikely(ret)) {
                DERROR("malloc chunk failed,errno:%d\n", ret);
                goto ERR1;
        }
#endif

        lsv_info->share.write_log_enter_count++;

        if (lsv_feature & LSV_FEATURE_LOG) {
                lsv_volume_io_t vio;
                lsv_volume_io_init(&vio, this_chunk_id, 0, LSV_CHUNK_SIZE, type);
                ret = lsv_volume_chunk_update(lsv_info, &vio, buf);
                if (unlikely(ret)) {
                        DFATAL("write chunk failed, errno %d chunk id %u\n", ret, this_chunk_id);
                        goto ERR2;
                }
        }

        *chunk_id = this_chunk_id;

        lsv_info->share.write_log_enter_count--;
        return 0;
ERR2:
        lsv_volume_chunk_free(lsv_info, type, this_chunk_id);
        lsv_info->share.write_log_enter_count--;
#if LSV_VOLUME_GC
ERR1:
#endif
        return ret;
}

int lsv_volume_chunk_malloc(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t *chunk_id)
{
        int ret = 0;

        lsv_info->share.malloc_chunk_enter_count++;

#if ENABLE_PROFILE
        struct timeval t1, t2;
        _gettimeofday(&t1, NULL);
#endif

#if LSV_VOLUME_GC
        ret = lsv_volume_volgc_malloc(lsv_info, type, chunk_id);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#else
        ret = lsv_chunk_pool_malloc(lsv_info, type, chunk_id);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif
        // ret = chunk_pool_select((chunk_pool *)lsv_info->chunk_pool, chunk_id);

#if ENABLE_PROFILE
        _gettimeofday(&t2, NULL);
        int64_t used = _time_used(&t1, &t2);
        DINFO("chunk_id %u used %ju ret %d\n", *chunk_id, used, ret);
        lsv_info->op_stat[MALLOC_CHUNK].count += 1;
        lsv_info->op_stat[MALLOC_CHUNK].used += used;
#endif

        lsv_info->share.malloc_chunk_enter_count--;
        return 0;
err_ret:
        lsv_info->share.malloc_chunk_enter_count--;
        return ret;
}

int lsv_volume_chunk_update(lsv_volume_proto_t *lsv_info, lsv_volume_io_t *vio, lsv_s8_t *buf)
{
        int ret = 0;
        lsv_u64_t lsv_volume_offset;

        DBUG("update chunk_id:%u offset:%u size:%u\n", vio->chunk_id, vio->offset, vio->size);

        ret = _obj_offset_size_check(&vio->offset, &vio->size);
        if (unlikely(ret)) {
                DERROR("Invalid argument, chunk_id %u offset:%u size:%u ret %d\n",
                       vio->chunk_id, vio->offset, vio->size, ret);
                GOTO(ERR, ret);
        }

        lsv_volume_offset = __chkid2voloffset(vio->chunk_id) + vio->offset;

        int retry = 0;
        while (1) {
                ret = volume_storage_op->write(lsv_info, buf, vio->type, lsv_volume_offset, vio->size);
                if (unlikely(ret)) {
                        retry++;
                        DFATAL("retry %d chunk_id %u off %u size %u ret %d\n", retry, vio->chunk_id,
                               vio->offset, vio->size, ret);
                        if (retry > 1) {
                                GOTO(ERR, ret);
                        }

                        continue; //important, write error. !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
                }

                // success
                break;
        }

        DBUG("end update chunk_id:%u offset:%u size:%u ret %d\n", vio->chunk_id, vio->offset, vio->size, ret);

        return 0;
ERR:
        return ret;
}

void lsv_volume_chunk_update_callback_(void *arg)
{
        int ret = 0;
        lsv_volume_chunk_rw_callback_ctx * ctx = (lsv_volume_chunk_rw_callback_ctx *) arg;

        ret = lsv_volume_chunk_update(ctx->lsv_info, ctx->vio, ctx->buf);

        if (ctx->callback) {
                ctx->callback(ret, ctx->callback_arg);
        }
        if (ctx->is_free) {
                free(ctx);
        }
}

int lsv_volume_chunk_update_callback(lsv_volume_proto_t *lsv_info, lsv_volume_io_t *vio, lsv_s8_t *buf,
                                     lsv_volume_chunk_rw_callback callback, void* callback_arg)
{

        int ret = 0;
        lsv_volume_chunk_rw_callback_ctx *ctx = NULL;

        ret = ymalloc((void **)&ctx, sizeof(lsv_volume_chunk_rw_callback_ctx));
        if (unlikely(ret)) {
                DERROR("malloc failed, ret %d\n", ret);
                ret = ENOMEM;
                goto ERR;
        }

        ctx->lsv_info = lsv_info;
        ctx->vio = vio;
        ctx->buf = buf;
        ctx->callback = callback;
        ctx->callback_arg = callback_arg;
        ctx->is_free = 1;

        DBUG("create a volume_chunk_update task\n");
        schedule_task_new("lsv_volume_chunk_update_callback_", lsv_volume_chunk_update_callback_, ctx, -1);

        return 0;
ERR:
        return ret;
}

int lsv_volume_chunk_read(lsv_volume_proto_t *lsv_info, lsv_u32_t chunk_id, lsv_s8_t *buf)
{
        return lsv_volume_chunk_read_data(lsv_info, LSV_THIS_VOL_INO, chunk_id, 0,
                                          LSV_CHUNK_SIZE, buf);
}

int lsv_volume_chunk_read_data_i(lsv_volume_proto_t *lsv_info, lsv_io_t *io, lsv_s8_t *buf)
{
        int ret = 0;

        DBUG("chunk_id %u, chunk_off %d, real_off %ju size %u\n", io->chunk_id,
              io->chunk_off, io->offset, io->size);

        ret = volume_storage_op->read(lsv_info, io->offset, io->size, buf);
        if (unlikely(ret)) {
                DFATAL("chunk id:%u,offset:%u,size:%u ret %d\n", io->chunk_id, io->chunk_off, io->size, ret);
                goto ERR;
        }

        return 0;
ERR:
        return ret;
}

int lsv_volume_chunk_read_data(lsv_volume_proto_t *lsv_info, uint64_t ino, lsv_u32_t chunk_id, lsv_u32_t offset,
                               lsv_u32_t size, lsv_s8_t *buf)
{
        int ret = 0;
        lsv_io_t io;

        if (ino == LSV_THIS_VOL_INO) {
                lsv_io_init(&io, &lsv_info->volume_proto->chkid, chunk_id, offset, size, 0);
        } else {
                chkid_t chkid;
                chkid.type = __VOLUME_CHUNK__;
                chkid.id = ino;
                chkid.idx = 0;
                lsv_io_init(&io, &chkid, chunk_id, offset, size, 0);
        }

        YASSERT(io.id.type == __VOLUME_CHUNK__);

        if (chkid_cmp(&lsv_info->volume_proto->chkid, &io.id) == 0) {
                ret = lsv_volume_chunk_read_data_i(lsv_info, &io, buf);
        } else {
                // TODO cross volume
                // step1: chkid -> nid
                nid_t nid;
                ret = md_map_getsrv(&io.id, &nid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                // step2: read
                buffer_t tmp_buf;
                mbuffer_init(&tmp_buf, 0);

                ret = volume_remote_io_read(&nid, &io, &tmp_buf);
                if (unlikely(ret)) {
                        mbuffer_free(&tmp_buf);
                        GOTO(err_ret, ret);
                }

                mbuffer_popmsg(&tmp_buf, buf, size);
                mbuffer_free(&tmp_buf);
        }

        return ret;
err_ret:
        return ret;
}

void lsv_volume_chunk_read_callback_(void *arg)
{
        int ret = 0;
        lsv_volume_chunk_rw_callback_ctx * ctx = (lsv_volume_chunk_rw_callback_ctx *) arg;

        // TODO error pointer
        ret = lsv_volume_chunk_read_data(ctx->lsv_info, LSV_THIS_VOL_INO, ctx->vio->chunk_id, ctx->vio->offset,
                                             ctx->vio->size, ctx->buf);

        if (ctx->callback) {
                ctx->callback(ret, ctx->callback_arg);
        }
        if (ctx->is_free) {
                free(ctx);
        }
}

int lsv_volume_chunk_read_callback(lsv_volume_proto_t *lsv_info, lsv_volume_io_t *vio, lsv_s8_t *buf,
                lsv_volume_chunk_rw_callback callback, void* callback_arg)
{
        int ret = 0;
        lsv_volume_chunk_rw_callback_ctx * ctx = NULL;

        ret = ymalloc((void **)&ctx, sizeof(lsv_volume_chunk_rw_callback_ctx));
        if (unlikely(ret)) {
                ret = ENOMEM;
                GOTO(ERR, ret);
        }

        ctx->lsv_info = lsv_info;
        ctx->vio = vio;
        ctx->buf = buf;
        ctx->callback = callback;
        ctx->callback_arg = callback_arg;
        ctx->is_free = 1;

        DBUG("create a volume_chunk_read task\n");
        schedule_task_new("lsv_volume_chunk_read_callback_", lsv_volume_chunk_read_callback_, ctx, -1);

        return 0;
ERR:
        return ret;
}


int lsv_volume_chunk_free(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t chunk_id)
{
#if LSV_VOLUME_GC
        return lsv_volume_volgc_free(lsv_info, type, chunk_id);
#else
        return lsv_chunk_pool_free(lsv_info, type, chunk_id);
#endif
}

/**
 *
 * @todo count > 1时，或有问题，导致无法free，空间得不到回收，一直野蛮增长
 *
 * @param lsv_info
 * @param type
 * @param count
 * @param chunk_id
 * @return
 */
int lsv_volume_chunk_malloc_batch(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t count, lsv_u32_t *chunk_id)
{
        int ret = 0;
#if LSV_VOLUME_GC
        ret = lsv_volume_volgc_malloc_batch(lsv_info, type, count, chunk_id);
#else  
        ret = lsv_chunk_pool_malloc_batch(lsv_info, type, count, chunk_id);
#endif

        DBUG("batch malloc count %u type %u\n", count, type);

        for (int i = 0; i < count; i++) {
                DBUG("batch malloc chunk_id %u\n", chunk_id[i]);
        }

        return ret;
}

int lsv_volume_chunk_free_batch(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_u32_t count, lsv_u32_t *chunk_id)
{
#if LSV_VOLUME_GC
        return lsv_volume_volgc_free_batch(lsv_info, type, count, chunk_id);
#else
        return lsv_chunk_pool_free_batch(lsv_info, type, count, chunk_id);
#endif
}
